Coded and Uncoded Coded Tasks and Smooth Turn Mobility Control

This document is used to describe the use of the provided task files, as well as the use and updating of the included modules and related functions.


Custom Tasks

It is also possible to create custom tasks and distribute each sub-task to each connected Mega mBots and later reconstruct the result of each sub-task. To do this, there two steps:

1.) On the client side, create a function task in a new Python file. This file is used for each Mega mBot to compute the sub-task distributed to it. Hence, it must contain only one parameter, which is used to pass the data assigned to it. Once the computation is done, it must return the result, which will later be collected and reconstructed on the client (a.k.a. the master node). Similar to the run method inside the Mobility class, if you want to import any module, you must import inside this task function. Here is example of computing uncoded convolution:

def task(data):
  import numpy as np
  s = data[0]
  A = data[1:1+s]
  X = data[1+s:]
  rs = np.convolve(A, X)
  return rs

2.) On the client side again (which is also the master node), you should edit the code on ./master.py for data decomposition, result reconstruction, and ways to get useful information of the task. This means you must have at least three functions: data_decompose, reconstruct and get_information. The first function data_decompose is used to decompose data into small piece and prepare to send to each connect Mega mBot, and it has two parameters. The number of connected Mega mBots passed by the first parameter, and the state (communication and computation latency) per Mega mBot is passed by the second parameter. The second function reconstruct is used to reconstruct results return by Mega mBots. Thus, it has only one parameter, which is used to pass the results. The get_information function is used to return useful information of the task (e.g., size of A), and it is okay to return None if the needed information is pre-stored in data_decompose and reconstruct. The following is the example of data decomposition and result reconstruction for uncoded convolution:

import math
import numpy as np

def data_decompose(proc_num, state = None):
  size_of_A, size_of_X = (40000, 20000)
  s = int(math.sqrt((size_of_A * size_of_X) / proc_num))

  np.random.seed(35)
  A = np.random.randint(1000, size=size_of_A).tolist()
  X = np.random.randint(1000, size=size_of_X).tolist()

  iter1 = int(size_of_A / s)
  iter2 = int(size_of_X / s)

  data_list = []
  for i in range(0, iter1):
    for j in range(0, iter2):
      data = [s]
      data += A[i * s:(i + 1) * s]
      data += X[j * s:(j + 1) * s]
      data_list.append(data)

  return data_list

def reconstruct(data_list):
  proc_num = len(data_list)
  size_of_A, size_of_X = (40000, 20000)
  s = int(math.sqrt((size_of_A * size_of_X) / proc_num))

  iter1 = int(size_of_A / s)
  iter2 = int(size_of_X / s)

  n_A = s + size_of_X - 1
  n_Z = len(data_list[0])

  result = []
  for i in range(0, iter1):
    data = []
    partial_result = [0 for i in range(0, n_A)]
    for j in range(0, iter2):
      padding_left = s * j
      padding_right = n_A - n_Z - s * j
      data_temp = data_list[i*iter2+j]
      data_temp = np.pad(data_temp, (padding_left, padding_right), 'constant')
      data.append(data_temp)
    for d in data:
      partial_result = np.add(partial_result, d)
    result.append(partial_result)

  n = size_of_A + size_of_X - 1
  n_Z = len(result[0])

  data = []
  final_result = [0 for i in range(0, n)]
  for j in range(0, len(result)):
    padding_left = s * j
    padding_right = n - n_Z - s * j
    data_temp = np.pad(result[j], (padding_left, padding_right), 'constant')
    data.append(data_temp)

  for d in data:
    final_result = np.add(final_result, d)

  # return final_result
  return np.round(final_result).astype(int)

def get_information():
  pass

Note: If the data returned by the data_decompose function is in numpy array format, be sure to convert it to a list through .tolist().

Using Task Files

1.) Copy-paste the function task from the first step of the Custom Tasks section and save it to a new Python file ./task/uncoded1.py.

2.) Copy-paste all code from the second step of the Custom Tasks section to the Python file ./master.py.

3.) On the client side, run:
$ python3 client.py
Welcome to use Mega mBot control client.

1). Control all Mega mBots in this network 2). Control a single Mega mBot in this network

Select from 1 to 2: 1 Now scan all available Mega mBots... 2022-03-15 15:12:39: Start scanning... - 192.168.1.114:8085 is available - 192.168.1.150:8085 is available

2022-03-15 15:13:05: Scan completed. Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.150:8085 Connected to all available Mega mBots. Instruction: task Task file location: task/uncoded1.py - ins to 192.168.1.114: task - ins to 192.168.1.150: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 3.90215s <<


Appendix 1 - Uncoded Convolution

The idea of uncoded convolution can be learned from the paper - Coded convolution for parallel and distributed computing within a deadline. In this section we will demonstrate how to use the Mega mBot control system to compute uncoded convolutions when connecting two Mega mBots.

1.) Copy-paste the function task from the first step of the Custom Tasks section and save it to a new Python file ./task/uncoded1.py.

2.) Copy-paste all code from the second step of the Custom Tasks section to the Python file ./master.py.

3.) On the client side, run:

$ python3 client.py
Welcome to use Mega mBot control client.

1). Control all Mega mBots in this network 2). Control a single Mega mBot in this network

Select from 1 to 2: 1 Now scan all available Mega mBots... 2022-03-15 15:12:39: Start scanning... - 192.168.1.114:8085 is available - 192.168.1.150:8085 is available

2022-03-15 15:13:05: Scan completed. Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.150:8085 Connected to all available Mega mBots. Instruction: task Task file location: task/uncoded1.py - ins to 192.168.1.114: task - ins to 192.168.1.150: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 3.90215s <<

Appendix 2 - Coded Convolution

The idea of coded convolution can be learned from the paper - Coded convolution for parallel and distributed computing within a deadline. In this section we will demonstrate how to use the Mega mBot control system to compute coded convolutions when connecting two Mega mBots.

1.) Copy-paste the following code and save it to a new Python file ./task/prop.py.

def task(data):
  import numpy as np
  s = int(data[0])
  j = int(data[1])
  index = int(data[2])
  A = data[3:3+s]
  X = data[3+s:]
  rs = np.convolve(A, X)
  rs = np.concatenate((j, index, rs), axis=None)
  return rs

2.) Copy-paste all code from the following code to the Python file ./master.py.

import math
import numpy as np

_proc_num = 0

def data_decompose(proc_num, state = None):
  global _proc_num
  _proc_num = proc_num
  info = get_information()
  size_of_A, size_of_X = info['size_of_A'], info['size_of_X']
  n, s = info['n'], info['s']

  np.random.seed(35)
  A = np.random.randint(1000, size=size_of_A).tolist()
  X = np.random.randint(1000, size=size_of_X).tolist()
  iter1 = int(size_of_A / s)
  iter2 = int(size_of_X / s)

  G = generator(n, iter1)

  a_reshape = np.reshape(A, (iter1, s))
  a_hat = np.matmul(G, a_reshape)
  a_hat = a_hat.tolist()
  for i in range(0, n):
    # include index for building decoded matrix
    a_hat[i] = np.concatenate((i, a_hat[i]), axis=None).tolist()

  data_list = []
  for i in range(0, n):
    for j in range(0, iter2):
      data = [s, j]
      data += a_hat[i]
      data += X[j * s:(j + 1) * s]
      data_list.append(data)

  return data_list

def reconstruct(data_list):
  info = get_information()
  size_of_A, size_of_X = info['size_of_A'], info['size_of_X']
  n, s = info['n'], info['s']
  iter1 = int(size_of_A / s)
  iter2 = int(size_of_X / s)
  G = generator(n, iter1)

  # organize data_list
  collections = [[] for i in range(0, iter2)]
  for data in data_list:
    j = int(data[0])
    rs = data[1:]
    collections[j].append(rs)

  # reconstruct results
  collection_2 = []
  for j in range(0, iter2):
    collection = collections[j]
    indexes = []
    results_partial = []
    G_partial = np.array([])
    for result_slice in collection:
      if (len(indexes) == iter1):
        break
      index = int(result_slice[0])
      results_partial.append(result_slice[1:]);
      indexes.append(index)

    for index in indexes:
      G_partial = np.append(G_partial, G[index])
    G_partial = np.reshape(G_partial, (len(indexes), iter1))
    G_1 = np.linalg.inv(G_partial)
    result = np.matmul(G_1, results_partial)

    for (i, result_slice) in enumerate(result):
      # padding with 0 for a
      a_total_padding = (len(result) - 1) * s
      a_left = i * s
      a_right = a_total_padding - a_left
      result_slice = np.pad(result_slice, (a_left, a_right), 'constant')

      # padding with 0 for x
      x_total_padding = size_of_A + size_of_X - 1 - len(result_slice)
      x_left = j * s
      x_right = x_total_padding - x_left

      result_slice = np.pad(result_slice, (x_left, x_right), 'constant')
      collection_2.append(result_slice)

  result = np.zeros(size_of_A + size_of_X - 1)
  for result_slice in collection_2:
    result = np.add(result, result_slice)

  # return final_result
  return np.round(result).astype(int)

def get_information():
  size_of_A, size_of_X = (40000, 20000)
  proc_num = _proc_num
  s = int(math.sqrt((size_of_A * size_of_X) / proc_num)) + 1
  iter1 = int(size_of_A / s)
  iter2 = int(size_of_X / s)

  while proc_num != iter1 * iter2 and size_of_A % s != 0 and size_of_X % s != 0:
    s = s + 1
    iter1 = int(size_of_A / s)
    iter2 = int(size_of_X / s)
    if s > size_of_A or s > size_of_X:
      raise ValueError('Please check for the size of A, the size of X and the number of nodes.')

  n = int((proc_num * s) / size_of_X)
  return {
    'size_of_A': size_of_A,
    'size_of_X': size_of_X,
    'n': n,
    's': s
  }


def generator(n, columns):
  start_at = 0
  # step = 0.0065555
  step = 0.15
  end_at = start_at + step * n
  x = np.arange(start_at, end_at, step)
  G = np.vander(x, columns)
  G_t = np.fliplr(G)
  return G_t

def spliter(data):
  return [data[0], data[1:]]

3.) On the client side, run:

$ python3 client.py
Welcome to use Mega mBot control client.

1). Control all Mega mBots in this network 2). Control a single Mega mBot in this network

Select from 1 to 2: 1 Now scan all available Mega mBots... Trying to Connect to 192.168.1.114:8085 Trying to Connect to 192.168.1.114:8081 Trying to Connect to 192.168.1.114:8082 Trying to Connect to 192.168.1.114:8083 Trying to Connect to 192.168.1.114:8084 Trying to Connect to 192.168.1.114:8086 Trying to Connect to 192.168.1.114:8088 Trying to Connect to 192.168.1.114:8087 Connected to all available Mega mBots. Instruction: task Task file location: task/coded.py - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task - ins to 192.168.1.114: task Final Result: [ 581095 863278 1330104 ... 793678 449146 109648] >> Total time (computation + communication + reconstruction): 8.25399s <<

It should be noted that due to the lack of Mega mBots, we were running 8 server.py (with different ports) in the same Mega mBot, so the performance of distributed computing is not reflected in this example.

Appendix 3 - Smooth Turn Mobility Control

The idea of smooth-turn mobility model for airborne networks can be learned from the paper - A Smooth-Turn Mobility Model for Airborne Networks. The Matlab code provided by this paper, can be downloaded here. The following is the smooth-turn mobility control implement in the Mega mBot Control System:

class Mobility:
  def __init__(self, car):
    self.car = car
    self.V = 500 # Speed
    self.phi = 0 # Heading angle (radian)
    self.delta_T = 6 # Unit time step
    self.reflection = False

  def run(self):
    import time
    expire_after = 60
    expired_time = time.time() + expire_after

    while(1):
      if time.time() > expired_time:
        break

      theta = self.generate_theta()
      self.turn_with_angle(theta, self.generate_interval())

    self.car.move_stop()
    time.sleep(0.2)
    print("mega mBot stop!")


  def generate_theta(self, R = None):
    if R == None:
      R = self.generate_radius()
    W = self.V / R
    theta = W * self.delta_T
    return theta

  def generate_radius(self):
    import numpy as np
    varian=3.105e-5 # Variance of the Gaussian variable, determining the preference between straight trajectories and turns
    R_d_1 = np.random.normal(0, varian)
    R = 1/R_d_1
    return R

  def generate_interval(self):
    import numpy as np
    ExponentialMean = 100 # Mean duration between the changes of turning centers
    interval = np.random.exponential(ExponentialMean) / self.delta_T
    interval = max(3, interval)
    interval = min(60, interval)
    return interval

  def turn_with_angle(self, theta, duration, time_step = 0.3, show_msg = True):
    import time
    angle = abs(theta)
    forward_speed = 50
    turn_speed = forward_speed * angle
    msg = "Turn left with:\n" if theta > 0 else "Turn right with:\n"
    msg += "  - forward_speed: " + str(forward_speed) + "\n"
    msg += "  - turn_speed: " + str(turn_speed) + "\n"
    msg += "  - theta: " + str(theta) + "\n"
    msg += "  - duration: " + str(duration) + "\n"
    if show_msg:
      print(msg)

    stop_time = time.time() + duration
    while time.time() < stop_time:
      if theta > 0:
        self._turn_left(forward_speed, turn_speed, time_step / 2)
      else:
        self._turn_right(forward_speed, turn_speed, time_step / 2)

      # relection if reach to the boundary
      if self._reflex():
        self.reflection = not self.reflection
        self.car.LED(1, "74aacc")
        self.car.LED(2, "74aacc")
      else:
        self.car.LED_reset(1)
        self.car.LED_reset(2)

  def _turn_left(self, forward_speed, left_speed, time_step = 0.2):
    from time import sleep
    if self.reflection == False:
      self.car.move_forward(forward_speed)
    else:
      self.car.move_backward(forward_speed)
    sleep(time_step)
    self.car.move_left(left_speed)
    sleep(time_step)

  def _turn_right(self, forward_speed, right_speed, time_step = 0.2):
    from time import sleep
    if self.reflection == False:
      self.car.move_forward(forward_speed)
    else:
      self.car.move_backward(forward_speed)
    sleep(time_step)
    self.car.move_right(right_speed)
    sleep(time_step)

  def _reflex(self):
    IR1 = self.car.is_sensing('IR1')
    IR2 = self.car.is_sensing('IR2')
    IR3 = self.car.is_sensing('IR3')
    IS1 = self.car.is_sensing('IS1')
    IS2 = self.car.is_sensing('IS2')
    if IS1 or IS2 or IR1 or IR2 or IR3:
      return True
    return False

Appendix 4 - Smooth Turn Mobility Control

The idea of smooth-turn mobility model for airborne networks can be learned from the paper - A Smooth-Turn Mobility Model for Airborne Networks. The Matlab code provided by this paper, can be downloaded here. The following is the smooth-turn mobility control implement in the Mega mBot Control System:

class Mobility:
  def __init__(self, car):
    self.car = car
    self.V = 500 # Speed
    self.phi = 0 # Heading angle (radian)
    self.delta_T = 6 # Unit time step
    self.reflection = False

  def run(self):
    import time
    expire_after = 60
    expired_time = time.time() + expire_after

    while(1):
      if time.time() > expired_time:
        break

      theta = self.generate_theta()
      self.turn_with_angle(theta, self.generate_interval())

    self.car.move_stop()
    time.sleep(0.2)
    print("mega mBot stop!")


  def generate_theta(self, R = None):
    if R == None:
      R = self.generate_radius()
    W = self.V / R
    theta = W * self.delta_T
    return theta

  def generate_radius(self):
    import numpy as np
    varian=3.105e-5 # Variance of the Gaussian variable, determining the preference between straight trajectories and turns
    R_d_1 = np.random.normal(0, varian)
    R = 1/R_d_1
    return R

  def generate_interval(self):
    import numpy as np
    ExponentialMean = 100 # Mean duration between the changes of turning centers
    interval = np.random.exponential(ExponentialMean) / self.delta_T
    interval = max(3, interval)
    interval = min(60, interval)
    return interval

  def turn_with_angle(self, theta, duration, time_step = 0.3, show_msg = True):
    import time
    angle = abs(theta)
    forward_speed = 50
    turn_speed = forward_speed * angle
    msg = "Turn left with:\n" if theta > 0 else "Turn right with:\n"
    msg += "  - forward_speed: " + str(forward_speed) + "\n"
    msg += "  - turn_speed: " + str(turn_speed) + "\n"
    msg += "  - theta: " + str(theta) + "\n"
    msg += "  - duration: " + str(duration) + "\n"
    if show_msg:
      print(msg)

    stop_time = time.time() + duration
    while time.time() < stop_time:
      if theta > 0:
        self._turn_left(forward_speed, turn_speed, time_step / 2)
      else:
        self._turn_right(forward_speed, turn_speed, time_step / 2)

      # relection if reach to the boundary
      if self._reflex():
        self.reflection = not self.reflection
        self.car.LED(1, "74aacc")
        self.car.LED(2, "74aacc")
      else:
        self.car.LED_reset(1)
        self.car.LED_reset(2)

  def _turn_left(self, forward_speed, left_speed, time_step = 0.2):
    from time import sleep
    if self.reflection == False:
      self.car.move_forward(forward_speed)
    else:
      self.car.move_backward(forward_speed)
    sleep(time_step)
    self.car.move_left(left_speed)
    sleep(time_step)

  def _turn_right(self, forward_speed, right_speed, time_step = 0.2):
    from time import sleep
    if self.reflection == False:
      self.car.move_forward(forward_speed)
    else:
      self.car.move_backward(forward_speed)
    sleep(time_step)
    self.car.move_right(right_speed)
    sleep(time_step)

  def _reflex(self):
    IR1 = self.car.is_sensing('IR1')
    IR2 = self.car.is_sensing('IR2')
    IR3 = self.car.is_sensing('IR3')
    IS1 = self.car.is_sensing('IS1')
    IS2 = self.car.is_sensing('IS2')
    if IS1 or IS2 or IR1 or IR2 or IR3:
      return True
    return False